package streamAPI.project.activityselectname;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import util.FlinkConstant;
import util.FlinkConstant._MySQL;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author yue.cao
 * @since 10-19-2020
 */
public class LineToActivityInfo extends RichMapFunction<String, ActivityInfo> {

	private Connection connection = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		connection = DriverManager.getConnection(_MySQL.URL, _MySQL.USER_NAME, _MySQL.PASSWORD);
	}

	@Override
	public ActivityInfo map(String line) throws Exception {
		String[] split = line.split(",");
		String uid = split[0];
		String aid = split[1];
		String time = split[2];
		Integer eventType = Integer.parseInt(split[3]);
		String province = split[4];
		PreparedStatement preparedStatement = connection.prepareStatement("SELECT a_name FROM t_activities WHERE id = ?");
		preparedStatement.setString(1, aid);
		ResultSet resultSet = preparedStatement.executeQuery();
		String activityName = null;
		while (resultSet.next()) {
			activityName = resultSet.getString(1);
		}
		preparedStatement.close();
		return new ActivityInfo(uid, aid, activityName, time, eventType, province);
	}

	@Override
	public void close() throws Exception {
		super.close();
		connection.close();
	}

}
